package page

import java.util

import com.alibaba.fastjson.{JSON, JSONObject}
import day02.UserActionAnalyze.getActionRDDByDateRange
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.{Row, SparkSession}
import sessionanalyze.constant.Constants
import sessionanalyze.dao.IPageSplitConvertRateDAO
import sessionanalyze.dao.factory.DAOFactory
import sessionanalyze.dao.impl.PageSplitConvertRateDAOImpl
import sessionanalyze.domain.PageSplitConvertRate
import sessionanalyze.test.MockData
import sessionanalyze.util.{DateUtils, NumberUtils, ParamUtils}

import scala.collection.mutable
import scala.collection.mutable.ListBuffer

/*
                    .::::.
                  .::::::::.
                 :::::::::::
             ..:::::::::::'	  FUCK YOU
           '::::::::::::'		Goddess bless, never BUG
             .::::::::::
        '::::::::::::::..
             ..::::::::::::.
           ``::::::::::::::::
            ::::``:::::::::'        .:::.
           ::::'   ':::::'       .::::::::.
         .::::'      ::::     .:::::::'::::.
        .:::'       :::::  .:::::::::' ':::::.
       .::'        :::::.:::::::::'      ':::::.
      .::'         ::::::::::::::'         ``::::.
  ...:::           ::::::::::::'              ``::.
 ```` ':.          ':::::::::'                  ::::..
                    '.:::::'                    ':'````..
                    
 ━━━━━━━━━━━━━━━━━━━━ 女神保佑,永无BUG ━━━━━━━━━━━━━━━━━━━━
*/
object PageOneStepConvertRate {
  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName(Constants.SPARK_APP_NAME_PAGE).setMaster("local")
    val sc = new SparkContext(conf)
    val sparkSession = SparkSession.builder().appName(Constants.SPARK_APP_NAME_SESSION).getOrCreate()
    //生成模拟数据
    MockData.mock(sc,sparkSession)


    val taskDAO = DAOFactory.getTaskDAO
    //获取TaskId
    val taskId = ParamUtils.getTaskIdFromArgs(args,Constants.SPARK_LOCAL_TASKID_PAGE)
    //得到 Task 对象
    val task = taskDAO.findById(taskId)
    if(task == null){
      println("没有获取到对应的Task信息")
      return
    }
    //解析当前任务的参数信息
    val taskParam = JSON.parseObject(task.getTaskParam)
    //取某个时间范围内的数据
    val actionRDD:RDD[Row] = getActionRDDByDateRange(sparkSession,taskParam)

    //将数据转换成 （sessionId，row）格式
    val sessionIdActionRDD: RDD[(String, Row)] = actionRDD.map(row => (row.getString(2),row))
    //根据sessionId对行为数据进行聚合，获取用户在一个session会话访问的页面流
    val groupedSessionIdActionRDD: RDD[(String, Iterable[Row])] = sessionIdActionRDD.groupByKey()
    println(groupedSessionIdActionRDD.count())
    groupedSessionIdActionRDD.cache()

    //计算每个用户在一个会话访问的页面流（切片，1）
    val pageSplitRDD: RDD[(String, Long)] = getPageSplit(sc,groupedSessionIdActionRDD,taskParam)

    //获取（切片，count）
    val pageSplitMap: collection.Map[String, Long] = pageSplitRDD.countByKey()

    //获取每一个页面访问数
    val startPageVisitCount: mutable.HashMap[Long, Long] = getStartPageVisitCount(sc,groupedSessionIdActionRDD,taskParam)

    //计算页面单跳转化率
    val pageSplitConvertRate = computePageSplitConvertRate(startPageVisitCount,pageSplitMap,taskParam)

    //把结果存入数据库
    insertConvertRateDB(taskId,pageSplitConvertRate)

  }

  def insertConvertRateDB(taskId: Long, pageSplitConvertRate: mutable.HashMap[String, Double]): Unit ={
    val str = new StringBuffer()
    for(key <- pageSplitConvertRate.keys)
      str.append(key + "=" + pageSplitConvertRate.getOrElse(key,0) + "|")
    val rateDAO = DAOFactory.getPageSplitConvertRateDAO
    val rate = new PageSplitConvertRate
    rate.setTaskid(taskId)
    rate.setConvertRate(str.toString)
    rateDAO.insert(rate)
    println("插入数据表page_split_convert_rate 成功")
  }





  def computePageSplitConvertRate(startPageVisitCount: mutable.HashMap[Long, Long], pageSplitMap: collection.Map[String, Long], taskParam: JSONObject): mutable.HashMap[String,Double] ={
    //解析参数，获取需要计算的页面
    val str: String = ParamUtils.getParam(taskParam,Constants.PARAM_TARGET_PAGE_FLOW)
    //每个切片
    val pageSplitArr: util.ArrayList[String] = getTaskpageSplit(str)
    //定义一个容器，存放计算出的切片转换率
    val convertRateMap = new mutable.HashMap[String,Double]()

    var i = 0
    while(i < pageSplitArr.size()){
      val split = pageSplitArr.get(i)
      //获取切片起始页Id
      val id = split.split("_")(0).toLong

      //获取该切片访问数
      val targetPageSplitCount:Double = pageSplitMap.getOrElse(split,0L).toDouble
      var convertRate = 0.0
      if(startPageVisitCount(id) != 0){
        convertRate = NumberUtils.formatDouble(targetPageSplitCount/startPageVisitCount(id),2)
//        println(s"targetPageSplitCount=$targetPageSplitCount  startPageVisitCount(id)=${startPageVisitCount(id)}")
      }
      i += 1
      convertRateMap.put(split,convertRate)
    }
    return convertRateMap
  }



  def getStartPageVisitCount(sc:SparkContext,groupedSessionIdActionRDD: RDD[(String, Iterable[Row])], taskParam: JSONObject): mutable.HashMap[Long,Long] ={
    //解析第一个页面id
//    val startPageId = ParamUtils.getParam(taskParam,Constants.PARAM_TARGET_PAGE_FLOW).split(",")(0).toLong

    val startPageIds = ParamUtils.getParam(taskParam,Constants.PARAM_TARGET_PAGE_FLOW).split(",")

    //定义一个容器，存放数据类型为（pageId，count）
    val map = new mutable.HashMap[Long,Long]()
    //分别统计每一个page访问次数
    for(id <- startPageIds) {
//      val list = new ListBuffer[Long]
//      val count: Long = groupedSessionIdActionRDD.map(tup => {
//        val it = tup._2.iterator
//        while (it.hasNext) {
//          val row = it.next()
//          val pageId: Long = row.getLong(3)
//          if (pageId == id.toLong) {
//            list += pageId
//          }
//        }
//        println("pageId " + id + " 循环 " + list.size)
//        list.size
//      }).count()

      var i = sc.longAccumulator("count")
      groupedSessionIdActionRDD.foreachPartition(x => {
        while(x.hasNext){
          val data = x.next()
//          val id = data._1
          val it = data._2.iterator
          while(it.hasNext){
            val row = it.next()
            val pageId: Long = row.getLong(3)
            if (pageId == id.toLong) {
              //list += pageId
              i.add(1L)
            }
          }
        }
      })
//      println("pageId " + id + " 循环 " + i.value)
      map.put(id.toLong,i.value)
    }
//    for(key <- map.keys)
//      println("------------>页面id " + key + " 访问数 " + map.get(key))
    map

//    //定义一个容器，存放所有的pageId
//    val list = new ListBuffer[Long]
//
//    groupedSessionIdActionRDD.flatMap(tup => {
//      val it = tup._2.iterator
//      while(it.hasNext){
//        val row = it.next()
//        val pageId = row.getLong(3)
//        if(pageId == startPageId)
//          list += pageId
//      }
//      list
//    }).count()
  }




  def getTaskpageSplit(str: String): util.ArrayList[String] ={
    val list = new util.ArrayList[String]()
    val split = str.split(",")
    var i =1
    while(i < split.length) {
      list.add(split(i - 1) + "_" + split(i))
      i += 1
    }
    list
  }

  def getPageSplit(sc:SparkContext,groupedSessionIdActionRDD: RDD[(String, Iterable[Row])], taskParam: JSONObject): RDD[(String, Long)] ={
    //解析参数，获取页面分片
    val taskpageFlow: String = ParamUtils.getParam(taskParam,Constants.PARAM_TARGET_PAGE_FLOW)
    val pageFlowSplitbc: Broadcast[util.ArrayList[String]] = sc.broadcast(getTaskpageSplit(taskpageFlow))

    val pageSplitOneRDD: RDD[(String, Long)] = groupedSessionIdActionRDD.flatMap(tup => {
      val list = new ListBuffer[(String, Long)]
      val it = tup._2.iterator
      val targetPageSplit: util.ArrayList[String] = pageFlowSplitbc.value
      val rows = new ListBuffer[Row]
      while (it.hasNext)
        rows += it.next()
      //按时间对用户当前会话中的行为进行排序，生成访问的页面流
      implicit val keyOrder = new Ordering[Row] {
        override def compare(x: Row, y: Row): Int = {
          val time1 = x.getString(4)
          val time2 = y.getString(4)
          val dateTime1 = DateUtils.parseTime(time1)
          val dateTime2 = DateUtils.parseTime(time2)
          return (dateTime1.getTime - dateTime2.getTime).toInt
        }
      }
      rows.sorted
      //生成用户在当前会话的访问切片
      var lastPageId = -1L
      for (row <- rows) {
        val pageId = row.getLong(3)
        if (lastPageId != -1) {
          val pageSplit = lastPageId + "_" + pageId
          //判断当前切片是否在目标切片内
          if (targetPageSplit.contains(pageSplit)) {
            list += ((pageSplit, 1L))
          }
        }
        lastPageId = pageId
      }
      list
    })
    pageSplitOneRDD
  }


}
